package com.saga.energy.app;

import com.alibaba.fastjson.JSONArray;
import com.saga.energy.bean.User;
import com.saga.energy.sink.ZJSpaceToMysql;
import com.saga.energy.sink.ZJSpaceToMysql2;
import com.saga.energy.source.ZJMySQLSource;
import com.saga.energy.util.HttpUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ZJToMySQL {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<User> userDataStreamSource =
                env.addSource(new ZJMySQLSource()).setParallelism(1);

        SingleOutputStreamOperator<JSONArray> map = userDataStreamSource.map(new MapFunction<User, JSONArray>() {
            @Override
            public JSONArray map(User user) throws Exception {
                return HttpUtils.getDateInfoByURL(user.getId());
            }
        }).setParallelism(1);

//        map.addSink(new ZJSpaceToMysql()).setParallelism(1);
        map.addSink(new ZJSpaceToMysql2()).setParallelism(1);

        env.execute();
    }
}
